/**
 * Copyright (c) 2018, 西安星沙网络科技-版权所有
 *
 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.gnu.org/licenses/lgpl-3.0.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.waleychain.exchange.service.impl;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSONObject;

import cn.waleychain.exchange.core.logger.LoggerHelper;
import cn.waleychain.exchange.core.utils.StringUtils;
import cn.waleychain.exchange.service.RedisService;

@Service
public class RedisServiceImpl implements RedisService {

	private static final Logger mLog = LoggerFactory.getLogger(RedisServiceImpl.class);
	
	@Resource  
    private RedisTemplate<String, Object> redisTemplate;

	/**
	 * 获取分布式锁等待时长（毫秒）
	 */
    private Integer lockWaitMills = 10000;
	
	/**
	 * 分布式锁过期时间（秒）
	 */
	private Integer lockExpireSecs = 120;
	
	@Autowired  
    private Environment env;
	
	@Override
	public Boolean hasKey(String cacheName, String key) {
		if (StringUtils.isNotBlank(cacheName)) {
			return redisTemplate.hasKey(cacheName + key);
		}
		
		return redisTemplate.hasKey(key);
	}

	@Override
	public void set(String cacheName, String key, Object val) {
		if (StringUtils.isNotBlank(cacheName)) {
			redisTemplate.opsForValue().set(cacheName + key, val);
		} else {
			redisTemplate.opsForValue().set(key, val);
		}
	}

	@Override
	public void set(String cacheName, String key, Object val, Long expires, TimeUnit timeUnit) {
		if (StringUtils.isNotBlank(cacheName)) {
			redisTemplate.opsForValue().set(cacheName + key, val, expires, timeUnit);
		} else {
			redisTemplate.opsForValue().set(key, val, expires, timeUnit);
		}
		
	}

	@Override
	public void delete(String cacheName, String key) {
		if (this.hasKey(cacheName, key)) {
			if (StringUtils.isNotBlank(cacheName)) {
				redisTemplate.delete(cacheName + key);
			} else {
				redisTemplate.delete(key);
			}
        }
	}

	@Override
	public Object get(String cacheName, String key) {
		if (StringUtils.isNotBlank(cacheName)) {
			return redisTemplate.opsForValue().get(cacheName + key);
		}
		
		return redisTemplate.opsForValue().get(key);
	}

	@Override
	public boolean lock(String prefix, String key, String value, boolean waitLock) {
		
		if (StringUtils.isBlank(key) || StringUtils.isBlank(value)) {
			return false;
		}
		
		lockWaitMills = Integer.parseInt(env.getProperty("lockWaitMills", String.valueOf(lockWaitMills)));
		lockExpireSecs = Integer.parseInt(env.getProperty("lockExpireSecs", String.valueOf(lockExpireSecs)));
		// 等待时长
		long waitTime = System.currentTimeMillis() + lockWaitMills;
		String lockKey = generateLockKey(prefix, key);
		while (!redisTemplate.opsForValue().setIfAbsent(lockKey, value)) {
			if (!waitLock || System.currentTimeMillis() >= waitTime) {
				return false;
			}
			
			try {
				Thread.sleep(0L, 1000);
			} catch (InterruptedException e) {
				;
			}
			
		}
		
		redisTemplate.expire(lockKey, lockExpireSecs, TimeUnit.SECONDS);
		
		return true;
	}
	
	@Override
	public boolean lock(String prefix, String key, boolean waitLock) {

		return lock(prefix, key, key, waitLock);
	}

	private String generateLockKey(String prefix, String key) {
		String lockKey = prefix + key;
		return lockKey;
	}

	@Override
	public void unlock(String prefix, String key) {
		
		try {
			String lockKey = generateLockKey(prefix, key);
            if(redisTemplate.hasKey(lockKey) ){
            	redisTemplate.opsForValue().getOperations().delete(lockKey);//删除key
            }
            
        } catch (Exception e) {
            LoggerHelper.printLogErrorNotThrows(mLog, e, "[Redis分布式锁] 解锁出现异常了");
        }
		
	}

	@Override
	public void rpush(final String cacheName, final String key, Object val) {
		final String value = JSONObject.toJSONString(val);
        redisTemplate.execute(new RedisCallback<Long>() {  
            @Override  
            public Long doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                if (StringUtils.isNotBlank(cacheName)) {
                	return connection.rPush(serializer.serialize(cacheName + key), serializer.serialize(value));
                }
                
                return connection.rPush(serializer.serialize(key), serializer.serialize(value));  
            }  
        });  
        
	}

	@Override
	public void lpush(final String cacheName, final String key, Object val) {
		final String value = JSONObject.toJSONString(val);
        redisTemplate.execute(new RedisCallback<Long>() {  
            @Override  
            public Long doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                if (StringUtils.isNotBlank(cacheName)) {
                	return connection.lPush(serializer.serialize(cacheName + key), serializer.serialize(value));  
                }
                
                return connection.lPush(serializer.serialize(key), serializer.serialize(value));  
            }
        });
		
	}

	@Override
	public String lpop(final String cacheName, final String key) {
		
		String result = redisTemplate.execute(new RedisCallback<String>() {  
            @Override  
            public String doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                byte[] res = null;
                if (StringUtils.isNotBlank(cacheName)) {
                	res = connection.lPop(serializer.serialize(cacheName + key));
                } else {
                	res = connection.lPop(serializer.serialize(key));
                }
                
                return serializer.deserialize(res);  
            }  
        });
        
        return result;  
	}

	@Override
	public Set<String> hkey(final String cacheName) {

		Set<String> result = redisTemplate.execute(new RedisCallback<Set<String>>() {  
            @Override  
            public Set<String> doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
               	Set<byte[]> res = connection.hKeys(serializer.serialize(cacheName));
               	
               	if (res == null) {
                	return null;
                }
                
                Set<String> data = new HashSet<>(res.size());
                
                Iterator<byte[]> iter = res.iterator();
                while (iter.hasNext()) {
                	byte[] by = iter.next();
                	String value = serializer.deserialize(by);
                	data.add(value);
                }
                
                return data;  
            }  
        }); 
        
		return result;
		
//		return redisTemplate.opsForHash().keys(cacheName);
	}

	@Override
	public String hget(final String cacheName, final String key) {
		
		String result = redisTemplate.execute(new RedisCallback<String>() {  
            @Override  
            public String doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                
                byte[] res = connection.hGet(serializer.serialize(cacheName), serializer.serialize(key));
                
                if (res != null) {
                	return serializer.deserialize(res);
                }
                
                return null;  
            }  
        }); 
		
//		return (String) redisTemplate.opsForHash().get(cacheName, key);
		
		return result;
	}

	@Override
	public void hset(final String cacheName, final String key, Object val) {
		
		final String value = JSONObject.toJSONString(val);
		redisTemplate.execute(new RedisCallback<String>() {  
            @Override  
            public String doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
               	connection.hSet(serializer.serialize(cacheName), serializer.serialize(key), serializer.serialize(value));
               	
                return null;  
            }  
        }); 
		
	}
	
	@Override
	public void hdel(final String cacheName, final String key) {
		
		redisTemplate.execute(new RedisCallback<String>() {  
            @Override  
            public String doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
               	connection.hDel(serializer.serialize(cacheName), serializer.serialize(key));
               	
                return null;  
            }  
        }); 
		
	}

	@Override
	public String rpop(final String cacheName, final String key) {
        
		String result = redisTemplate.execute(new RedisCallback<String>() {  
            @Override  
            public String doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                if (StringUtils.isNotBlank(cacheName)) {
                	return serializer.deserialize(connection.rPop(serializer.serialize(cacheName + key)));
                }
                
                return serializer.deserialize(connection.rPop(serializer.serialize(key)));  
            }  
        }); 
        
		return result;
	}

	@Override
	public List<String> lrange(final String cacheName, final String key, final long start, final long end) {
		
		List<String> result = redisTemplate.execute(new RedisCallback<List<String>>() {  
            @Override  
            public List<String> doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                List<byte[]> list = null;
                if (StringUtils.isNotBlank(cacheName)) {
                	list = connection.lRange(serializer.serialize(cacheName + key), start, end);
                } else {
                	list = connection.lRange(serializer.serialize(key), start, end);  
                }
                
                if (list == null) {
                	return null;
                }
                
                List<String> data = new ArrayList<>(list.size());
                
                Iterator<byte[]> iter = list.iterator();
                while (iter.hasNext()) {
                	byte[] by = iter.next();
                	String value = serializer.deserialize(by);
                	data.add(value);
                }
                
                return data;
            }  
        }); 
        
		return result;
	}

	@Override
	public long llen(final String cacheName, final String key) {

		long result = redisTemplate.execute(new RedisCallback<Long>() {  
            @Override  
            public Long doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                if (StringUtils.isNotBlank(cacheName)) {
                	return connection.lLen(serializer.serialize(cacheName + key));
                }
                
                return connection.lLen(serializer.serialize(key)); 
            }  
        }); 
        
		return result;
	}

	@Override
	public void lset(final String cacheName, final String key, Object val) {
		final String value = JSONObject.toJSONString(val);
		redisTemplate.execute(new RedisCallback<Long>() {  
            @Override  
            public Long doInRedis(RedisConnection connection) throws DataAccessException {  
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                if (StringUtils.isNotBlank(cacheName)) {
                	connection.lSet(serializer.serialize(cacheName + key), 0L, serializer.serialize(value));
                } else {
                	connection.lSet(serializer.serialize(key), 0L, serializer.serialize(value));
                }
                
                return 0L;
            }  
        }); 
        
	}

	@Override
	public Boolean expire(String cacheName, String key, long timeout, TimeUnit unit) {
		
		if (StringUtils.isNotBlank(cacheName)) {
			return redisTemplate.expire(cacheName + key, timeout, unit);
		}
		
		return redisTemplate.expire(key, timeout, unit);
	}

}
